package com.xnx3.obs.sources;

import com.huaweicloud.dis.DIS;
import com.huaweicloud.dis.exception.DISClientException;
import com.huaweicloud.dis.iface.data.request.GetPartitionCursorRequest;
import com.huaweicloud.dis.iface.data.request.GetRecordsRequest;
import com.huaweicloud.dis.iface.data.response.GetPartitionCursorResult;
import com.huaweicloud.dis.iface.data.response.GetRecordsResult;
import com.huaweicloud.dis.iface.data.response.Record;
import com.huaweicloud.dis.iface.stream.request.DescribeStreamRequest;
import com.huaweicloud.dis.iface.stream.response.DescribeStreamResult;
import com.huaweicloud.dis.util.JsonUtils;
import com.huaweicloud.dis.util.PartitionCursorTypeEnum;
import com.xnx3.obs.config.DISReaderConfig;
import com.xnx3.obs.config.HoodieWriterConfig;
import com.xnx3.obs.sink.HoodieObsSink;
import com.xnx3.obs.util.DISRecordUtil;
import com.xnx3.obs.util.DISUtil;
import com.xnx3.obs.util.SparkUtil;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * Hudi data source
 *  Data from DIS event notification
 * @author aly
 */
public class DISEventSource {

    private static final Logger LOGGER = LoggerFactory.getLogger(DISEventSource.class);

    // DIS client
    private static DIS dic;

    // DIS data list
    private List<String> records = new ArrayList<>();

    // Spark client
    private static SparkSession sparkSession;

    // Hudi OBS write tool
    private static HoodieObsSink hoodieObsSink;

    public DISEventSource() {
        // init
        this.dic = DISUtil.getDISClient();
        this.sparkSession = SparkUtil.getSparkSession();
        this.hoodieObsSink = new HoodieObsSink();
    }

    /**
     * Fetch DIS notification and write to Hudi
     * @param tableName Hudi target table name
     * @param basePath Hudi target base path
     * @param config Hudi Run Configuration
     * @param dicConfig DIS Run Configuration
     */
    public void fetchEvents(String tableName, String basePath, HoodieWriterConfig config, DISReaderConfig dicConfig) {
        config.tableName(tableName).basePath(basePath);
        fetchEvents(config, dicConfig);
    }

    /**
     * Fetch DIS notification and write to Hudi
     * @param config Hudi Run Configuration
     * @param dicConfig DIS Run Configuration
     */
    public void fetchEvents(HoodieWriterConfig config, DISReaderConfig dicConfig) {

        // Configuration stream name
        String streamName = dicConfig.streamName();

        // Configuration data download partition ID
        String partitionId = dicConfig.partitionId();

        // Configure download data serial number
//        String startingSequenceNumber = dicConfig.startingSequenceNumber();

        // Configure how to download data
        // AT_SEQUENCE_NUMBER: Obtaining from the specified sequenceNumber requires setting GetPartitionCursorRequest.setStartingSequenceNumber
        // AFTER_SEQUENCE_NUMBER: Starting from the specified sequenceNumber, you need to set GetPartitionCursorRequest.setStartingSequenceNumber
        // TRIM_HORIZON: Get from the oldest record
        // LATEST: Get from the latest record
        // AT_TIMESTAMP: Obtaining from the specified timestamp (13 bits) requires setting GetPartitionCursorRequest.setTimestamp
        String cursorType = dicConfig.cursorType();

        try {
            // Get data cursor
            GetPartitionCursorRequest request = new GetPartitionCursorRequest();
            request.setStreamName(streamName);
            request.setPartitionId(partitionId);
            request.setCursorType(cursorType);
            // Download Data Method Selection AT_SEQUENCE_NUMBER AND AFTER_SEQUENCE_NUMBER, set the startingSequenceNumber field.
            if (PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name().equals(cursorType)
                    || PartitionCursorTypeEnum.AFTER_SEQUENCE_NUMBER.name().equals(cursorType)) {
                request.setStartingSequenceNumber(cursorRange(dicConfig));
            }
            // Download Data Method Selection AT_TIMESTAMP, set the timestamp field.
            if (PartitionCursorTypeEnum.AT_TIMESTAMP.name().equals(cursorType)) {
                request.setTimestamp(dicConfig.timestamp());
            }

            // Get partition cursor
            GetPartitionCursorResult response = dic.getPartitionCursor(request);
            String cursor = response.getPartitionCursor();

            LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

            // Get event record request
            GetRecordsRequest recordsRequest = new GetRecordsRequest();
            GetRecordsResult recordResponse = null;

            while (true) {
                // Set new partition cursor
                recordsRequest.setPartitionCursor(cursor);
                recordResponse = dic.getRecords(recordsRequest);
                // Next batch of data cursors
                cursor = recordResponse.getNextPartitionCursor();

                // Get datas and add it to the list
                for (Record record : recordResponse.getRecords()) {
                    // Add to list
                    records.add(DISRecordUtil.convertToString(record));

                    LOGGER.info("Get Record [{}], partitionKey [{}], sequenceNumber [{}].",
                            new String(record.getData().array()),
                            record.getPartitionKey(),
                            record.getSequenceNumber());
                }

                if (records.size() - 0 > 0) {
                    // Write hudi
                    hoodieObsSink.writeHudi(sparkSession, config, records);
                    // Clean up consumed records
                    records.clear();
                }
            }
        } catch (DISClientException e) {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                    e.getMessage(),
                    e);
            sparkSession.stop();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            sparkSession.stop();
        }
        // Stop spark session
        sparkSession.stop();
    }

    /**
     * Query DIS channel details
     * @param dicConfig DIS Run Configuration
     * @return {@link DescribeStreamResult}
     */
    public DescribeStreamResult describeStream(DISReaderConfig dicConfig) {
        // Query details，by huawei api
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        // Set channel name
        describeStreamRequest.setStreamName(dicConfig.streamName());
        DescribeStreamResult result = dic.describeStream(describeStreamRequest);
        return result;
    }

    /**
     * Get the available range of the cursor
     *  If the configured value is within the range, use the value.
     *  If it exceeds the range, automatically adjust the value.
     * @param dicConfig DIS Run Configuration
     * @return cursor position
     */
    public String cursorRange(DISReaderConfig dicConfig) {
        String sequenceNumberRange = describeStream(dicConfig).getPartitions().get(0).getSequenceNumberRange();
        // Get the cursor
        List<Integer> arr = JsonUtils.jsonToObj(sequenceNumberRange.trim().replace(":", ","), List.class);
        // Handling value out of scope situations
        if (Integer.parseInt(dicConfig.startingSequenceNumber()) - arr.get(0) < 0) {
            return arr.get(0) + "";
        } else if (Integer.parseInt(dicConfig.startingSequenceNumber()) - arr.get(1) > 0) {
            return arr.get(1) + "";
        }
        // Reasonable range, return the configured value
        return dicConfig.startingSequenceNumber();
    }

}
